package org.jetlinks.pro.clickhouse.metadata;

import lombok.AllArgsConstructor;
import org.hswebframework.ezorm.rdb.executor.BatchSqlRequest;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.hswebframework.ezorm.rdb.utils.SqlUtils;
import org.jetlinks.pro.clickhouse.ClickHouseOperations;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@AllArgsConstructor
public class ClickHouseReactiveSqlExecutor implements ReactiveSqlExecutor {

    private final ClickHouseOperations operations;

    @Override
    public Mono<Integer> update(Publisher<SqlRequest> request) {

        return this
            .execute(request)
            .thenReturn(0);
    }

    @Override
    public Mono<Void> execute(Publisher<SqlRequest> request) {
        return toFlux(request)
            .flatMap(sqlRequest -> {
                String sql = SqlUtils.toNativeSql(sqlRequest.getSql(), sqlRequest.getParameters());
                return operations.execute(sql);
            })
            .then()
            ;
    }

    @Override
    public <E> Flux<E> select(Publisher<SqlRequest> request, ResultWrapper<E, ?> wrapper) {
        return toFlux(request)
            .flatMap(sqlRequest -> {
                String sql = SqlUtils.toNativeSql(sqlRequest.getSql(), sqlRequest.getParameters());
                return operations.query(sql, wrapper);
            });
    }

    protected Flux<SqlRequest> toFlux(Publisher<SqlRequest> request) {

        return Flux
            .from(request)
            .flatMap(sql -> {
                if (sql instanceof BatchSqlRequest) {
                    return Flux.concat(Flux.just(sql), Flux.fromIterable(((BatchSqlRequest) sql).getBatch()));
                }
                return Flux.just(sql);
            })
            .filter(SqlRequest::isNotEmpty);
    }
}
